-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Add Top-N pushdown support for the MongoDB connector #26504
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
8973aee to
0331d72
Compare
|
The pull request is ready for review. Thanks! |
| } | ||
|
|
||
| @Override | ||
| public Optional<TopNApplicationResult<ConnectorTableHandle>> applyTopN(ConnectorSession session, ConnectorTableHandle table, long topNCount, List<SortItem> sortItems, Map<String, ColumnHandle> assignments) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put each variable on a new line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review!
Fixed.
| sortDocument.append(columnName, direction); | ||
| } | ||
| List<MongoTableSort> tableSortList = handle.sort().orElse(new ArrayList<>()); | ||
| MongoTableSort tableSort = new MongoTableSort(sortDocument, sortNullFieldsDocument == null ? Optional.empty() : Optional.of(sortNullFieldsDocument), toIntExact(topNCount)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use Optional.ofNullable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| if (sortItem.getSortOrder() == SortOrder.ASC_NULLS_LAST || sortItem.getSortOrder() == SortOrder.DESC_NULLS_FIRST) { | ||
| String sortColumnName = "_sortNulls_" + columnName; | ||
| Document condition = new Document(); | ||
| condition.append("$cond", List.of(new Document("$eq", new ArrayList<>(Arrays.asList("$" + columnName, null))), 1, 0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| condition.append("$cond", List.of(new Document("$eq", new ArrayList<>(Arrays.asList("$" + columnName, null))), 1, 0)); | |
| condition.append("$cond", ImmutableList.of(new Document("$eq", Arrays.asList("$" + columnName, null)), 1, 0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| Document sortNullFieldsDocument = null; | ||
| for (SortItem sortItem : sortItems) { | ||
| String columnName = sortItem.getName(); | ||
| int direction = (sortItem.getSortOrder() == SortOrder.ASC_NULLS_FIRST || sortItem.getSortOrder() == SortOrder.ASC_NULLS_LAST) ? 1 : -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we give 1 and -1 a meaningful name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| .toList(); | ||
| builder.append(", orderBy = ").append(sortEntries); | ||
| if (sortNullFields.isPresent()) { | ||
| List<String> nullFields = sortNullFields.get().keySet().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ImmutableList.copyOf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| for (String sortField : tableSort.sort().keySet()) { | ||
| // Sorting on the field does not work unless we add it to the projection | ||
| if (!projection.containsKey(sortField)) { | ||
| projection.append(sortField, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, it's not obvious for the meaning of 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
ea62cea to
f1754ea
Compare
| public Optional<TopNApplicationResult<ConnectorTableHandle>> applyTopN( | ||
| ConnectorSession session, | ||
| ConnectorTableHandle table, | ||
| long topNCount, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the topNCount is more than range of integer ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added check similar to applyLimit method
// MongoDB doesn't support topN number greater than integer max
if (topNCount > Integer.MAX_VALUE) {
return Optional.empty();
}```
| MongoColumnHandle columnHandle = (MongoColumnHandle) columnHandleEntry.getValue(); | ||
| if (!columnHandleEntry.getKey().equals(columnHandle.baseName())) { | ||
| // We don't support complex nested queries | ||
| return Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about for dbRef column type ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand it, applyTopN is called only for one table, no JOIN. The dbRef from MongoDB would be a reference to another collection, what we achieve with a JOIN. If it's something else, please clarify.
I wrote a simple test, and the execution doesn't enter the applyTopN method.
* Test topN dbRef.
*/
@Test
public void testJoinTopNDbRef() {
Session session = Session.builder(getSession())
.setSystemProperty(IGNORE_STATS_CALCULATOR_FAILURES, "false")
.build();
assertQuery(
session,
"SELECT n.name, c.name " +
"FROM nation n " +
"JOIN customer c ON c.nationkey = n.nationkey " +
"ORDER BY n.nationkey, c.name LIMIT 2");
}```
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is the case, we could simply add a defense check to Make sure the columnHandle#dbRefField is false, otherwise return Optional.empty() to indicate we don't support it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AbstractTestQueries.testPredicate() fails.
@Test
public void testPredicate()
{
assertQuery("" +
"SELECT *\n" +
"FROM (\n" +
" SELECT orderkey+1 AS a FROM orders WHERE orderstatus = 'F' UNION ALL \n" +
" SELECT orderkey FROM orders WHERE orderkey % 2 = 0 UNION ALL \n" +
" (SELECT orderkey+custkey FROM orders ORDER BY orderkey LIMIT 10)\n" +
") \n" +
"WHERE a < 20 OR a > 100 \n" +
"ORDER BY a");
}
columnHandle.dbRefField is false.
However,
columnHandle.baseName() is "orderkey"
columnHandleEntry.getKey() is "orderkey_12"
So I used this condition to avoid the pushdown for this test case also.
|
involve #26521, we should wait that to cover the basic test |
|
This pull request has gone a while without any activity. Ask for help on #core-dev on Trino slack. |
| MongoColumnHandle columnHandle = (MongoColumnHandle) columnHandleEntry.getValue(); | ||
| if (!columnHandleEntry.getKey().equals(columnHandle.baseName())) { | ||
| // We don't support complex nested queries | ||
| return Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is the case, we could simply add a defense check to Make sure the columnHandle#dbRefField is false, otherwise return Optional.empty() to indicate we don't support it
| Optional<String> filter, | ||
| TupleDomain<ColumnHandle> constraint, | ||
| Set<MongoColumnHandle> projectedColumns, | ||
| Optional<List<MongoTableSort>> sort, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it requires List instead of Optional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a single MongoTableSort is used,
BaseConnectorTest.testTopNPushdown() test fails.
// TopN over TopN
assertThat(query("SELECT orderkey, totalprice FROM (SELECT orderkey, totalprice FROM orders ORDER BY 1, 2 LIMIT 10) ORDER BY 2, 1 LIMIT 5"))
.ordered()
.isFullyPushedDown();
Expected :[[2, 38426.09], [6, 45523.1], [4, 56000.91], [34, 73315.48], [5, 105367.67]]
Actual :[[35271, 874.89], [28647, 924.33], [58145, 929.03], [8354, 974.04], [37415, 986.63]]
Also, AbstractTestQueries.testTopN() fails for similar issue.
// TopN over TopN
assertQueryOrdered("SELECT orderkey, totalprice FROM (SELECT orderkey, totalprice FROM orders ORDER BY 1, 2 LIMIT 10) ORDER BY 2, 1 LIMIT 5");
The nested SELECT will produce a MongoTableSort.
The root SELECT will need to aggregate the MongoTableSort entities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, got it, so it is not "entire table sort", it is actual one sort item, we could rename it similar to "MongoSortItem"
It seems not, I needs to take a look at this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is for the case handle push topN into a table already contains topN info
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
Should I do the rename to MongoSortItem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chenjian2664
Yes.
Should I do the rename to MongoSortItem?
Add Top-N (ORDER BY + LIMIT) pushdown support for the MongoDB connector. MongoDB considers null values to be less than any other value. We can handle sort items with SortOrder.ASC_NULLS_LAST or SortOrder.DESC_NULLS_FIRST with a MongoDB aggregation pipeline, where we add computed fields to sort correctly. We add sort fields to the projection, so that they are considered by MongoDB. We add Top-N pushdown support only for simple nested queries (that work on the same collection).
|
|
||
| sortDocument.append(columnName, direction); | ||
| } | ||
| List<MongoTableSort> tableSortList = handle.sort().orElse(new ArrayList<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider using ImmutableList.Builder
| handle.constraint(), | ||
| handle.projectedColumns(), | ||
| Optional.of(tableSortList), | ||
| OptionalInt.empty()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's safer to put handle.limit()
| List<Document> aggregateList = new ArrayList<>(); | ||
| aggregateList.add(new Document("$match", filter)); | ||
| for (MongoTableSort tableSort : tableSortList) { | ||
| tableSort.sortNullFields().ifPresent(sortNullFields -> new Document("$addFields", sortNullFields)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line seems not doing anything?
| } | ||
| } | ||
|
|
||
| List<Document> aggregateList = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use ImmuableList and rename the aggregateList to aggregatePipeline
Description
Add Top-N (ORDER BY + LIMIT) pushdown support for the MongoDB connector.
Additional context and related issues
MongoDB considers null values to be less than any other value. We can handle sort items with SortOrder.ASC_NULLS_LAST or SortOrder.DESC_NULLS_FIRST with a MongoDB aggregation pipeline, where we add computed fields to sort correctly.
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
(X ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text: